Add durable control-plane state storage#175
Conversation
There was a problem hiding this comment.
Code Review
This pull request replaces various in-memory storage mechanisms (for rate limiting, admin sessions, and OAuth tokens) with a centralized, MongoDB-backed ControlPlaneStore to support multi-instance deployments. It also introduces stricter error handling in production, ensuring that durable storage is used. Review feedback focused on improving timestamp consistency by capturing the current time once per operation and adopting datetime.now(timezone.utc) over the deprecated utcnow(). Additionally, suggestions were made to pass the database-generated expiration timestamps back to the API responses to avoid redundant calculations and potential drift.
| def _create_mcp_temp_token(user_id: str) -> str: | ||
| """Create and store a temporary token for the user.""" | ||
| token = _generate_mcp_temp_token() | ||
| expires_at = datetime.utcnow() + timedelta(minutes=TEMP_TOKEN_TTL_MINUTES) | ||
|
|
||
| _mcp_temp_tokens[token] = { | ||
| "user_id": user_id, | ||
| "created_at": datetime.utcnow(), | ||
| "expires_at": expires_at, | ||
| "exchanged": False, | ||
| } | ||
|
|
||
| token, _ = control_plane_store.create_temp_token( | ||
| user_id=user_id, | ||
| ttl_minutes=TEMP_TOKEN_TTL_MINUTES, | ||
| prefix=TEMP_TOKEN_PREFIX, | ||
| ) | ||
| return token |
There was a problem hiding this comment.
The expires_at timestamp returned by control_plane_store.create_temp_token is currently ignored. Returning it from this helper allows the caller to use the exact timestamp stored in the database, avoiding redundant calculations and potential drift.
| def _create_mcp_temp_token(user_id: str) -> str: | |
| """Create and store a temporary token for the user.""" | |
| token = _generate_mcp_temp_token() | |
| expires_at = datetime.utcnow() + timedelta(minutes=TEMP_TOKEN_TTL_MINUTES) | |
| _mcp_temp_tokens[token] = { | |
| "user_id": user_id, | |
| "created_at": datetime.utcnow(), | |
| "expires_at": expires_at, | |
| "exchanged": False, | |
| } | |
| token, _ = control_plane_store.create_temp_token( | |
| user_id=user_id, | |
| ttl_minutes=TEMP_TOKEN_TTL_MINUTES, | |
| prefix=TEMP_TOKEN_PREFIX, | |
| ) | |
| return token | |
| def _create_mcp_temp_token(user_id: str) -> tuple[str, datetime]: | |
| """Create and store a temporary token for the user.""" | |
| return control_plane_store.create_temp_token( | |
| user_id=user_id, | |
| ttl_minutes=TEMP_TOKEN_TTL_MINUTES, | |
| prefix=TEMP_TOKEN_PREFIX, | |
| ) |
| try: | ||
| temp_token = _create_mcp_temp_token(user_id) | ||
| except RuntimeError: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | ||
| detail="Authentication storage unavailable", | ||
| ) | ||
|
|
||
| return MCPTempTokenResponse( | ||
| temp_token=temp_token, | ||
| expires_in=TEMP_TOKEN_TTL_MINUTES * 60, | ||
| expires_at=_mcp_temp_tokens[temp_token]["expires_at"] | ||
| expires_at=datetime.utcnow() + timedelta(minutes=TEMP_TOKEN_TTL_MINUTES) | ||
| ) |
There was a problem hiding this comment.
Use the expires_at timestamp returned by _create_mcp_temp_token instead of recalculating it. This ensures consistency between the response and the stored state.
| try: | |
| temp_token = _create_mcp_temp_token(user_id) | |
| except RuntimeError: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Authentication storage unavailable", | |
| ) | |
| return MCPTempTokenResponse( | |
| temp_token=temp_token, | |
| expires_in=TEMP_TOKEN_TTL_MINUTES * 60, | |
| expires_at=_mcp_temp_tokens[temp_token]["expires_at"] | |
| expires_at=datetime.utcnow() + timedelta(minutes=TEMP_TOKEN_TTL_MINUTES) | |
| ) | |
| try: | |
| temp_token, expires_at = _create_mcp_temp_token(user_id) | |
| except RuntimeError: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Authentication storage unavailable", | |
| ) | |
| return MCPTempTokenResponse( | |
| temp_token=temp_token, | |
| expires_in=TEMP_TOKEN_TTL_MINUTES * 60, | |
| expires_at=expires_at | |
| ) |
| import secrets | ||
| import string | ||
| import time | ||
| from datetime import datetime, timedelta |
| def create_temp_token(self, user_id: str, ttl_minutes: int, prefix: str = "xm-temp-") -> Tuple[str, datetime]: | ||
| token = _random_token(prefix, 32) | ||
| expires_at = datetime.utcnow() + timedelta(minutes=ttl_minutes) | ||
| token_hash = _hash_secret(token) | ||
| doc = { | ||
| "token_hash": token_hash, | ||
| "user_id": user_id, | ||
| "created_at": datetime.utcnow(), | ||
| "expires_at": expires_at, | ||
| } |
There was a problem hiding this comment.
Capture the current time once at the start of the method. This ensures that created_at and expires_at are perfectly consistent and avoids multiple calls to the clock. Additionally, prefer datetime.now(timezone.utc) as utcnow() is deprecated.
| def create_temp_token(self, user_id: str, ttl_minutes: int, prefix: str = "xm-temp-") -> Tuple[str, datetime]: | |
| token = _random_token(prefix, 32) | |
| expires_at = datetime.utcnow() + timedelta(minutes=ttl_minutes) | |
| token_hash = _hash_secret(token) | |
| doc = { | |
| "token_hash": token_hash, | |
| "user_id": user_id, | |
| "created_at": datetime.utcnow(), | |
| "expires_at": expires_at, | |
| } | |
| def create_temp_token(self, user_id: str, ttl_minutes: int, prefix: str = "xm-temp-") -> Tuple[str, datetime]: | |
| now = datetime.now(timezone.utc) | |
| token = _random_token(prefix, 32) | |
| expires_at = now + timedelta(minutes=ttl_minutes) | |
| token_hash = _hash_secret(token) | |
| doc = { | |
| "token_hash": token_hash, | |
| "user_id": user_id, | |
| "created_at": now, | |
| "expires_at": expires_at, | |
| } |
| def create_admin_session(self, user: Dict[str, Any], ttl_hours: int = 24) -> str: | ||
| token = _hash_secret(f"{user.get('username', 'admin')}:{secrets.token_hex(32)}:{time.time()}") | ||
| expires_at = datetime.utcnow() + timedelta(hours=ttl_hours) | ||
| doc = { | ||
| "session_hash": token, | ||
| "user": user, | ||
| "created_at": datetime.utcnow(), | ||
| "expires_at": expires_at, | ||
| } |
There was a problem hiding this comment.
Capture the current time once to ensure consistency between the session token generation, created_at, and expires_at fields.
| def create_admin_session(self, user: Dict[str, Any], ttl_hours: int = 24) -> str: | |
| token = _hash_secret(f"{user.get('username', 'admin')}:{secrets.token_hex(32)}:{time.time()}") | |
| expires_at = datetime.utcnow() + timedelta(hours=ttl_hours) | |
| doc = { | |
| "session_hash": token, | |
| "user": user, | |
| "created_at": datetime.utcnow(), | |
| "expires_at": expires_at, | |
| } | |
| def create_admin_session(self, user: Dict[str, Any], ttl_hours: int = 24) -> str: | |
| now = datetime.now(timezone.utc) | |
| token = _hash_secret(f"{user.get('username', 'admin')}:{secrets.token_hex(32)}:{now.timestamp()}") | |
| expires_at = now + timedelta(hours=ttl_hours) | |
| doc = { | |
| "session_hash": token, | |
| "user": user, | |
| "created_at": now, | |
| "expires_at": expires_at, | |
| } |
|
Hi @arbazkhan971 please have a look on the gemini suggestions |
|
| Filename | Overview |
|---|---|
| src/database/control_plane_store.py | New MongoDB-backed store for MCP temp tokens, OAuth auth codes, admin sessions, and rate limits; admin sessions store the bearer token directly (not a hash of it), and synchronous PyMongo calls are made from async contexts |
| src/api/dependencies.py | Switches rate limiter to control-plane store and adds RuntimeError→503 handling for API key validation; enforce_rate_limit is missing the same RuntimeError catch, leaving rate-limited endpoints with an unhandled 500 on DB failure |
| src/api/routes/admin.py | Migrates admin sessions to control-plane store; logout and WebSocket paths handle RuntimeError, but admin_login does not catch RuntimeError from create_admin_session, giving a 500 on DB failure |
| src/api/routes/auth.py | Migrates MCP temp tokens and OAuth auth codes to control-plane store; all call-sites consistently catch RuntimeError and return 503 |
| src/database/api_key_store.py | Adds _require_durable_storage guard to all write/read paths; in production, in-memory fallback raises RuntimeError instead of silently serving stale data |
Comments Outside Diff (2)
-
src/api/dependencies.py, line 300 (link)Unhandled
RuntimeErrorfrom rate-limit storage_rate_limiter.check()propagatesRuntimeErrorraised bycontrol_plane_store.check_rate_limit()when MongoDB is unavailable, butenforce_rate_limithas no try/except around the call. Every rate-limited endpoint will return an unhandled 500 instead of a clean 503 or a fail-open response when the database goes down after startup. Every other MongoDB-backed call in this file (e.g.validate_api_key) is already wrapped with aRuntimeError → 503translation. -
src/api/routes/admin.py, line 135-138 (link)create_admin_sessionRuntimeError not caught in admin loginIf MongoDB is unavailable (either never connected, or goes down after startup),
control_plane_store.create_admin_session()raisesRuntimeError. This is unhandled here, so admin login returns a 500 instead of a clean 503. All other newcontrol_plane_storecall-sites inauth.pyandapi_keys.pywrap analogous calls intry/except RuntimeErrorblocks — this one was missed.
Reviews (1): Last reviewed commit: "Add durable control-plane state storage" | Re-trigger Greptile
| def create_admin_session(self, user: Dict[str, Any], ttl_hours: int = 24) -> str: | ||
| token = _hash_secret(f"{user.get('username', 'admin')}:{secrets.token_hex(32)}:{time.time()}") | ||
| expires_at = datetime.utcnow() + timedelta(hours=ttl_hours) | ||
| doc = { | ||
| "session_hash": token, | ||
| "user": user, | ||
| "created_at": datetime.utcnow(), | ||
| "expires_at": expires_at, | ||
| } | ||
|
|
||
| if self._in_memory: | ||
| self._require_durable_storage() | ||
| _in_memory_admin_sessions[token] = doc | ||
| return token | ||
|
|
||
| try: | ||
| self.admin_sessions.insert_one(doc) | ||
| return token | ||
| except Exception as exc: | ||
| logger.error("Failed to create admin session: %s", exc) | ||
| raise RuntimeError("Failed to create admin session") from exc |
There was a problem hiding this comment.
Admin session token stored verbatim — missing second-level hash
For temp tokens and auth codes, the design stores sha256(plaintext_token) in MongoDB and returns the plaintext to the client. On lookup the client value is hashed before querying, so a DB read gives nothing directly usable.
create_admin_session breaks that pattern: the value assigned to token is itself a SHA-256 digest, and the same value is stored in the session_hash field and returned to the client. Anyone with read access to the admin_sessions collection can copy any session_hash value and use it as a valid bearer credential immediately — no reversal needed. The field name implies it should hold hash(token), not token itself.
| def check_rate_limit(self, identity: str, max_requests: int, window_seconds: int = 60) -> tuple[bool, int]: | ||
| window_key = int(time.time() // window_seconds) | ||
| window_expires_at = datetime.utcnow() + timedelta(seconds=window_seconds) | ||
|
|
||
| if self._in_memory: | ||
| self._require_durable_storage() | ||
| bucket = _in_memory_rate_limits.get(identity) | ||
| if not bucket or bucket.get("window_key") != window_key: | ||
| bucket = {"window_key": window_key, "count": 0} | ||
| if bucket["count"] >= max_requests: | ||
| return False, 0 | ||
| bucket["count"] += 1 | ||
| _in_memory_rate_limits[identity] = bucket | ||
| return True, max_requests - bucket["count"] | ||
|
|
||
| try: | ||
| from pymongo import ReturnDocument | ||
|
|
||
| doc = self.rate_limits.find_one_and_update( | ||
| {"identity": identity, "window_key": window_key}, | ||
| { | ||
| "$setOnInsert": { | ||
| "identity": identity, | ||
| "window_key": window_key, | ||
| "count": 0, | ||
| "window_started_at": datetime.utcnow(), | ||
| "window_expires_at": window_expires_at, | ||
| }, | ||
| "$inc": {"count": 1}, | ||
| }, | ||
| upsert=True, | ||
| return_document=ReturnDocument.AFTER, | ||
| ) | ||
| count = int(doc.get("count", 0)) | ||
| if count > max_requests: | ||
| return False, 0 | ||
| return True, max_requests - count | ||
| except Exception as exc: | ||
| logger.error("Failed to check rate limit: %s", exc) | ||
| raise RuntimeError("Failed to check rate limit") from exc |
There was a problem hiding this comment.
Synchronous PyMongo call inside an
async def blocks the event loop
check_rate_limit is a regular (sync) method, and _ControlPlaneRateLimiter.check() calls it directly from an async def without run_in_executor. Every rate-limited request will block the entire asyncio event loop for the duration of the MongoDB round-trip (tens of milliseconds under normal conditions, much longer under load). The old _SlidingWindowRateLimiter had the same shape but only touched an in-memory dict, which is near-instant.
/claim #161
Summary
Validation
uv run --python /Users/arbaz/.local/bin/python3.11 pytest -q tests/unit/test_control_plane_store.py tests/unit/test_database_stores.py tests/api/test_dependencies_and_routes.pygit diff --checkNotes